#include "config.h"

#include <dirent.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBLSV

#include "fileinfo.h"
#include "md_map.h"
#include "net_global.h"
#include "stor_rpc.h"
#include "vnode.h"
#include "volume_ctl.h"
#include "volume_proto.h"
#include "yid.h"
#include "ypage.h"

#include "lba_lock.h"
#include "lsv_bitmap.h"
#include "lsv_volume.h"
#include "lsv_bitmap_internal.h"
#include "lsv_lib.h"
#include "lsv_rcache.h"
#include "lsv_volume_proto.h"
#include "lsv_wbuffer.h"
#include "lsv_wbuffer_internal.h"
#include "row2_bitmap.h"

#include <buffer.h>
#include <stack.h>

#define USE_SPAN_LOCK 1

int row_bitmap_page_get_extents(row2_bitmap_unit_t *bitmap, uint32_t page_idx, uint32_t *chunk_id, uint32_t *vvol_id, uint32_t *n_repeats)
{
        int i;
        uint32_t _page_idx = page_idx;

        *chunk_id = bitmap->page_bits[page_idx].chunk_id;
        *vvol_id = bitmap->page_bits[page_idx].vvol_id;

        page_idx++;

        for (i = 1; i < sizeof(bitmap->page_bits) / sizeof(row2_bitmap_page_unit_t) - _page_idx; i++) {
                if (bitmap->page_bits[page_idx].chunk_id == *chunk_id && bitmap->page_bits[page_idx].vvol_id == *vvol_id)
                        page_idx++;
                else
                        break;
        }

        *n_repeats = i;

        return 1;
}

int row_bitmap_page_set_extents(row2_bitmap_unit_t *bitmap, uint32_t page_idx, uint32_t n_pages, uint32_t chunk_id, uint32_t vvol_id)
{
        int changed = 0;

        for (int i = 0; i < n_pages; i++) {
                if (bitmap->page_bits[page_idx].chunk_id != chunk_id || bitmap->page_bits[page_idx].vvol_id != vvol_id)
                        changed = 1;

                bitmap->page_bits[page_idx].chunk_id = chunk_id;
                bitmap->page_bits[page_idx].vvol_id = vvol_id;

                page_idx++;
        }

        return changed;
}

// Rolling on Wind ALGORITHM
void row_bitmap_page_get_fragments(lsv_volume_proto_t *lsv_info, row2_bitmap_unit_t *bitmap, uint32_t *n_frags, uint32_t *own_count)
{
        lsv_bitmap_context_t *bitmap_context = (lsv_bitmap_context_t *)lsv_info->bitmap_context;
        int i;

        *n_frags = 0;
        *own_count = (bitmap_context->bitmap_header->vvol_id == bitmap->page_bits[0].vvol_id) ? 1 : 0;

        uint32_t chunk_id = bitmap->page_bits[0].chunk_id;
        uint32_t vvol_id = bitmap->page_bits[0].vvol_id;

        if (!bitmap->owner) {
                *n_frags = *own_count = 0;

                return;
        }

        for (i = 1; i < sizeof(bitmap->page_bits) / sizeof(row2_bitmap_page_unit_t); i++) {
                if (bitmap->page_bits[i].chunk_id != chunk_id || bitmap->page_bits[i].vvol_id != vvol_id) {
                        (*n_frags)++;

                        chunk_id = bitmap->page_bits[i].chunk_id;
                        vvol_id = bitmap->page_bits[i].vvol_id;
                }

                if (bitmap->page_bits[i].chunk_id && bitmap->page_bits[i].vvol_id == bitmap_context->bitmap_header->vvol_id)
                        (*own_count)++;
        }
}

void row2_data_cow_callback(uint8_t *dataptr)
{
        row2_bitmap_unit_t *bitmap = (row2_bitmap_unit_t *)dataptr;

        DBUG("row2_data_cow_callback enter\r\n");

        for (int i = 0; i < CHUNK_SIZE / sizeof(row2_bitmap_unit_t); i++) {
                bitmap->owner = 0;

                bitmap++;
        }
}

int row2_volume_proto_read_aligned(volume_proto_t *volume_proto, uint64_t offset, uint32_t length, uint8_t *buf)
{
        int ret = 0;
        int page_idx;
        uint32_t left, rlen = 0;
        uint64_t _offset = offset;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        chunked_bitmap_accessor_t accessor = {0};
        row2_bitmap_unit_t rbitmap = {0};
        row2_bitmap_unit_t *bitmap = &rbitmap;

        YASSERT(offset % LSV_PAGE_SIZE == 0 && length % LSV_PAGE_SIZE == 0);

        static int enter = 0;
        // for(uint64_t lba = offset; lba < offset + length; lba += LSV_PAGE_SIZE)
        //       lba_lock_rdlock(lsv_info->lock_table, lba);

        // lsv_rdlock(&lsv_info->io_lock); /

        DBUG("enter %u off %ju size %d\r\n", enter, offset, length);
        enter++;

        _offset = offset;
        left = length;
        while (left > 0) {
                lsv_info->row2_stat.meta_read++;

#if USE_SPAN_LOCK
                ret = row_bitmap_chunked_read(lsv_info, round_down(_offset, CHUNK_SIZE), bitmap);
#else
                ret = row_bitmap_chunked_access(lsv_info, round_down(_offset, CHUNK_SIZE), &accessor);
                bitmap = accessor.bitmap;
#endif

                if (ret) {
                        row_access_end(&accessor);

                        goto end;// fatal.
                }

                DBUG("chunk_id=%u\r\n", bitmap->chunk_id);

                if (!bitmap || bitmap->chunk_id == 0) {
                        rlen = _min(left, (CHUNK_SIZE - _offset % CHUNK_SIZE));

                        memset(buf + _offset - offset, 0, rlen);

                        _offset += rlen;
                        left -= rlen;

                        if (bitmap)
                                row_access_end(&accessor);

                        continue;
                }

                do {
                        uint32_t chunk_off = _offset % CHUNK_SIZE;
                        page_idx = chunk_off / LSV_PAGE_SIZE;
                        uint32_t current_vvol_id = lsv_bitmap_get_current_volume(lsv_info->bitmap_context)->bitmap_header->vvol_id;

                        uint32_t chunk_id;
                        uint32_t vvol_id = 0;
                        uint32_t n_repeats = 0;

                        row_bitmap_page_get_extents(bitmap, page_idx, &chunk_id, &vvol_id, &n_repeats);

                        rlen = _min(left, (CHUNK_SIZE - _offset % CHUNK_SIZE));
                        rlen = _min(n_repeats * LSV_PAGE_SIZE, rlen);

                        if (!chunk_id)// not allocated.
                        {
                                memset(buf + _offset - offset, 0, rlen);

                                DBUG("read skip data, off=%ju, len = %u\r\n", _offset, rlen);
                        } else if (vvol_id == current_vvol_id) {
                                lsv_info->row2_stat.data_read++;

                                DBUG("read chunk, chunk_id=%u, chunk_off=%u, len = %u\r\n", chunk_id, chunk_off, rlen);

                                ret = lsv_bitmap_read_chunk(lsv_info, 0, chunk_id, chunk_off, rlen, (lsv_s8_t *)buf + _offset - offset);
                                if (ret) {
                                        DFATAL("read chunk error %d\r\n", ret);

                                        row_access_end(&accessor);
                                        goto end;
                                }

#if 0
                                char *crc = xmalloc(65536);
                                memset(crc, 0, 65536);
                                for(int i=0;i<rlen;i+=4096)
                                {
                                        char str[8];
                                        sprintf(str, "%u ", testcrc(buf + _offset - offset + i, 4096));
                                        strcat(crc, str);
                                }

                                DINFO("crc: %s\r\n", crc);
                                xfree(crc);
#endif
                        } else {
                                lsv_info->row2_stat.data_remote_read++;

                                uint64_t vol_id;

                                DBUG("remote read chunk, chunk_id=%u, chunk_off=%u, len = %u\r\n", chunk_id, chunk_off, rlen);

                                lsv_bitmap_vvol_to_vol(lsv_info, vvol_id, &vol_id);

                                ret = volume_proto_remote_read_chunk(lsv_info, vol_id, chunk_id, chunk_off, rlen, buf + _offset - offset);
                                if (ret) {
                                        DFATAL("remote read chunk error %d\r\n", ret);

                                        row_access_end(&accessor);
                                        goto end;
                                }
                        }
                        /*else
                        {
                                rlen = min(n_repeats * LSV_PAGE_SIZE, rlen);
                                memset(buf  + _offset - offset, 0, rlen);
                        }*/

                        _offset += rlen;
                        left -= rlen;
                } while ((_offset % CHUNK_SIZE) != 0 && left > 0);

                row_access_end(&accessor);
        }

end:
        if (!ret) {
                if (gloconf.volume_crc) {
                        for (int i = 0; i < length; i += 512)
                                check_bitmap_check(lsv_info->crc_context, offset + i, buf + i);
                }
        } else {
                DFATAL("row2_volume_proto_read_aligned error, ret %d\r\n", ret);
        }

        enter--;
        DBUG("row2_volume_proto_read_aligned leave %d\r\n", enter);
        // for(uint64_t lba = offset; lba < offset + length; lba += LSV_PAGE_SIZE)
        //        lba_lock_unlock(lsv_info->lock_table, lba);
        // lsv_unlock(&lsv_info->io_lock);
        return ret;
}

void row_bitmap_chunked_write_callback(void *arg)
{
        stack_context_t *stack = (stack_context_t *)arg;

        int ret = row_bitmap_chunked_write(stack_pop(stack), (uint64_t)stack_pop(stack), (struct row2_bitmap_unit *)stack_pop(stack));

        lsv_unlock((lsv_lock_t *)stack_pop(stack));

        *((int *)stack_pop(stack)) = ret;

        xfree(stack);
}

int row_bitmap_chunked_write_async(void *volume_context, uint64_t off, struct row2_bitmap_unit *bitmap_buf, lsv_lock_t *lock, int *ret)
{
        stack_context_t *stack = xmalloc(sizeof(stack_context_t));

        stack_init(stack);

        stack_push(stack, volume_context);
        stack_push(stack, (void *)off);
        stack_push(stack, bitmap_buf);
        stack_push(stack, lock);
        stack_push(stack, ret);

        schedule_task_new("row_bitmap_chunked_write", row_bitmap_chunked_write_callback, stack, -1);

        return 0;
}

void row_bitmap_write_chunk_callback(void *arg)
{
        stack_context_t *stack = (stack_context_t *)arg;

        int ret = lsv_bitmap_write_chunk(stack_pop(stack), (uint32_t)(uintptr_t)stack_pop(stack), (uint32_t)(uintptr_t)stack_pop(stack),
                                         (uint32_t)(uintptr_t)stack_pop(stack), stack_pop(stack));

        lsv_unlock((lsv_lock_t *)stack_pop(stack));

        *((int *)stack_pop(stack)) = ret;

        xfree(stack);
}

int row_bitmap_write_chunk_async(void *volume_context, uint32_t chunk_id, uint32_t chunk_off, uint32_t len, void *buf, lsv_lock_t *lock, int *ret)
{
        stack_context_t *stack = xmalloc(sizeof(stack_context_t));

        stack_init(stack);
        (void)buf;
        stack_push(stack, volume_context);
        stack_push(stack, (void *)(intptr_t)chunk_id);
        stack_push(stack, (void *)(intptr_t)chunk_off);
        stack_push(stack, (void *)(intptr_t)len);
        stack_push(stack, lock);
        stack_push(stack, ret);

        schedule_task_new("row_bitmap_write_chunk_async", row_bitmap_chunked_write_callback, stack, -1);

        return 0;
}

int row2_volume_proto_write_aligned(volume_proto_t *volume_proto, uint64_t offset, uint32_t length, uint8_t *buf)
{
        int ret = 0;
        int page_idx;
        int bitmap_changed = 0;
        uint32_t left, wlen = 0;
        uint64_t old_off;
        uint64_t _offset;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        // for(uint64_t lba = offset; lba < offset + length; lba += LSV_PAGE_SIZE)
        //        lba_lock_wrlock(lsv_info->lock_table, lba);
        // lsv_wrlock(&lsv_info->io_lock);

        YASSERT(offset % LSV_PAGE_SIZE == 0 && length % LSV_PAGE_SIZE == 0);
        assert(sizeof(row2_bitmap_unit_t) == 2048);

        static int enter = 0;
        enter++;
        DBUG("enter %u offset %ju size %d\r\n", enter, offset, length);

        _offset = offset;
        left = length;
        while (left) {
                lsv_info->row2_stat.meta_read++;

                chunked_bitmap_accessor_t accessor = {0};
                row2_bitmap_unit_t rbitmap = {0};
                row2_bitmap_unit_t *bitmap = &rbitmap;

#if USE_SPAN_LOCK
                ret = row_bitmap_chunked_read(lsv_info, round_down(_offset, CHUNK_SIZE), bitmap);
#else
                ret = row_bitmap_chunked_access(lsv_info, round_down(_offset, CHUNK_SIZE), &accessor);
                bitmap = accessor.bitmap;
#endif

                if (ret) {
                        goto end;// fatal.
                }

                DBUG("chunk_id=%u\r\n", bitmap->chunk_id);

                if (!bitmap)
                        bitmap = &rbitmap;

                if (bitmap->chunk_id == 0)// not allocated.
                {
                        lsv_info->row2_stat.data_malloc++;

                        ret = lsv_volume_chunk_malloc(lsv_info, LSV_LOG_LOG_STORAGE_TYPE, &bitmap->chunk_id);
                        if (unlikely(ret)) {
                                row_access_end(&accessor);
                                GOTO(end, ret);
                                // YASSERT(0);
                        }

                        DBUG("new_chunk_id=%u\r\n", bitmap->chunk_id);

                        // TODO 有快照的情况下，逐页更新bitmap，4K随机写的情况下，会有2倍的写放大
                        if (!lsv_bitmap_has_snapshot(lsv_info)) {
                                // memset(bitmap.page_bits, 0xff, sizeof(bitmap.page_bits)); //mark all valid as it is volume.
                                uint32_t vvol_id = lsv_bitmap_get_current_volume(lsv_info->bitmap_context)->bitmap_header->vvol_id;

                                DBUG("initialize full chunk, chunk_id=%u, vvol_id=%u\r\n", bitmap->chunk_id, vvol_id);
                                row_bitmap_page_set_extents(bitmap, 0, CHUNK_SIZE / LSV_PAGE_SIZE, bitmap->chunk_id, vvol_id);
                        }

                        bitmap->owner = 1;
                        bitmap_changed = 1;
                } else if (!bitmap->owner) {
                        lsv_info->row2_stat.data_malloc++;

                        ret = lsv_bitmap_alloc_chunk(lsv_info, &bitmap->chunk_id, 0);// zero the buf.
                        if (unlikely(ret)) {
                                row_access_end(&accessor);
                                GOTO(end, ret);
                        }

                        DBUG("allocate chunk on snapshot, new_chunk_id=%u\r\n", bitmap->chunk_id);

                        bitmap->owner = 1;
                        bitmap_changed = 1;
                } else {
                        DBUG("update chunk on snapshot, chunk_id=%u\r\n", bitmap->chunk_id);
                }

                old_off = _offset;// cross chunk.

                uint32_t frags = 0, owned = 0;
                // if(length != CHUNK_SIZE)
                //        row_bitmap_page_get_fragments(lsv_info, bitmap, &frags, &owned);
                if (frags > 64 || (frags > 16 && owned > 128)) {
                        uint8_t *chunk_buf = xmalloc(CHUNK_SIZE);
                        uint32_t vvol_id = lsv_bitmap_get_current_volume(lsv_info->bitmap_context)->bitmap_header->vvol_id;

                        ret = row2_volume_proto_read_aligned(volume_proto, round_down(_offset, CHUNK_SIZE), CHUNK_SIZE, chunk_buf);
                        if (unlikely(ret)) {
                                xfree(chunk_buf);

                                row_access_end(&accessor);
                                GOTO(end, ret);
                        }

                        wlen = _min(left, (CHUNK_SIZE - _offset % CHUNK_SIZE));
                        memcpy(chunk_buf + _offset % CHUNK_SIZE, buf + _offset - offset, wlen);

                        int change = row_bitmap_page_set_extents(bitmap, 0, CHUNK_SIZE / LSV_PAGE_SIZE, bitmap->chunk_id, vvol_id);
                        if (change) {
                                bitmap_changed = 1;
                        }

                        ret = row2_volume_proto_write_aligned(volume_proto, round_down(_offset, CHUNK_SIZE), CHUNK_SIZE, chunk_buf);
                        if (unlikely(ret)) {
                                xfree(chunk_buf);

                                row_access_end(&accessor);
                                GOTO(end, ret);
                        }

                        xfree(chunk_buf);
                        DFATAL("rolling on the wind, frags = %d, chunk_id=%d, owned=%d...\r\n", frags, bitmap->chunk_id, owned);
                } else
                        do {
                                uint32_t chunk_off = _offset % CHUNK_SIZE;
                                page_idx = chunk_off / LSV_PAGE_SIZE;
                                wlen = _min(left, (CHUNK_SIZE - _offset % CHUNK_SIZE));

                                uint32_t vvol_id = lsv_bitmap_get_current_volume(lsv_info->bitmap_context)->bitmap_header->vvol_id;
                                int change = row_bitmap_page_set_extents(bitmap, page_idx, wlen / LSV_PAGE_SIZE, bitmap->chunk_id, vvol_id);
                                if (change) {
                                        bitmap_changed = 1;
                                }

                                /*for(int i=0;i<wlen/LSV_PAGE_SIZE;i++)
                                {
                                        if(!row_is_bitmap_page_valid(&bitmap, page_idx, 1))
                                        {
                                                row_bitmap_page_mark_valid(&bitmap, page_idx, 1);

                                                old_off = _offset; //cross chunk.
                                                bitmap_changed = 1;
                                        }

                                        page_idx ++;
                                }*/

                                lsv_info->row2_stat.data_write++;

                                // TODO 数据直接写入HDD
                                ret = lsv_bitmap_write_data(lsv_info, bitmap->chunk_id, chunk_off, wlen, buf + _offset - offset);
                                if (unlikely(ret)) {
                                        row_access_end(&accessor);
                                        GOTO(end, ret);
                                }

                                DBUG("write chunk, chunk_id=%u, chunk_off=%u, len = %u\r\n", bitmap->chunk_id, chunk_off, wlen);

#if 0
                        char *crc = xmalloc(65536);
                        memset(crc, 0, 65536);
                        for(int i=0;i<wlen;i+=4096)
                        {
                                char str[8];
                                sprintf(str, "%u ", testcrc(buf + _offset - offset + i, 4096));
                                strcat(crc, str);
                        }

                        DINFO("crc: %s\r\n", crc);
                        xfree(crc);
#endif

                                _offset += wlen;
                                left -= wlen;
                        } while ((_offset % CHUNK_SIZE) != 0 && left > 0);

                if (bitmap_changed) {
                        ret = row_bitmap_chunked_write(lsv_info, round_down(old_off, CHUNK_SIZE), bitmap);
                        if (unlikely(ret)) {
                                GOTO(end, ret);
                        }

                        bitmap_changed = 0;
                }

                row_access_end(&accessor);
        }

end:

        if (!ret) {
                if (gloconf.volume_crc) {
                        for (int i = 0; i < length; i += 512)
                                check_bitmap_set(lsv_info->crc_context, offset + i, buf + i);
                }

        } else {
                DFATAL("error %d\r\n", ret);
        }
        /// for(uint64_t lba = offset; lba < offset + length; lba += LSV_PAGE_SIZE)
        //       lba_lock_unlock(lsv_info->lock_table, lba);

        enter--;
        DBUG("leave %d\r\n", enter);
        // lsv_unlock(&lsv_info->io_lock);

        return ret;
}

#define LOCK_GRANULARITY (2 * CHUNK_SIZE)

void row2_span_rdlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        // lsv_bitmap_rlock(lsv_info);

        while (len) {
                int clen = _min(len, (LOCK_GRANULARITY - off % LOCK_GRANULARITY));

                ltable_rdlock(&lsv_info->lock_table, round_down(off, LOCK_GRANULARITY));

                len -= clen;
                off += clen;
        }
}

void row2_span_wrlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        // lsv_bitmap_rlock(lsv_info);

        while (len) {
                int clen = _min(len, (LOCK_GRANULARITY - off % LOCK_GRANULARITY));

                ltable_wrlock(&lsv_info->lock_table, round_down(off, LOCK_GRANULARITY));

                len -= clen;
                off += clen;
        }
}

void row2_span_unlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        // lsv_bitmap_unlock(lsv_info);

        while (len) {
                int clen = _min(len, (LOCK_GRANULARITY - off % LOCK_GRANULARITY));

                ltable_unlock(&lsv_info->lock_table, round_down(off, LOCK_GRANULARITY));

                len -= clen;
                off += clen;
        }
}

#define COW_LOCK_GRANULARITY (512 * CHUNK_SIZE)

void row2_bitmap_span_rdlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
#if !USE_SPAN_LOCK
        return;
#endif

        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        // lsv_bitmap_rlock(lsv_info);

        while (len) {
                int clen = _min(len, (COW_LOCK_GRANULARITY - off % COW_LOCK_GRANULARITY));

                ltable_rdlock(&lsv_info->bitmap_cow_lt, round_down(off, COW_LOCK_GRANULARITY));

                len -= clen;
                off += clen;
        }
}

void row2_bitmap_span_wrlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
#if !USE_SPAN_LOCK
        return;
#endif

        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        // lsv_bitmap_rlock(lsv_info);

        while (len) {
                int clen = _min(len, (COW_LOCK_GRANULARITY - off % COW_LOCK_GRANULARITY));

                ltable_wrlock(&lsv_info->bitmap_cow_lt, round_down(off, COW_LOCK_GRANULARITY));

                len -= clen;
                off += clen;
        }
}

void row2_bitmap_span_unlock(volume_proto_t *volume_proto, uint64_t off, uint32_t len)
{
#if !USE_SPAN_LOCK
        return;
#endif

        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        // lsv_bitmap_unlock(lsv_info);

        while (len) {
                int clen = _min(len, (COW_LOCK_GRANULARITY - off % COW_LOCK_GRANULARITY));

                ltable_unlock(&lsv_info->bitmap_cow_lt, round_down(off, COW_LOCK_GRANULARITY));

                len -= clen;
                off += clen;
        }
}

/**
 * @brief 根据bitmap定位，然后读取，bitmap返回的可能是不连续的extent，可以并行执行。
 *
 * @note ServerSAN场景下，read cache大有可为
 * @note HCI场景下，read cache作用有效
 *
 * @param volume_proto
 * @param io
 * @param buf
 * @return
 */
int row2_volume_proto_read(volume_proto_t *volume_proto, const io_t *io, buffer_t *buf)
{

        int ret = 0;
        uint64_t off_new = io->offset;
        uint32_t len_new = io->size;
        uint8_t *_buffer;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        lsv_rdlock(&lsv_info->io_lock);// lock for snapshots, resizing etc.

        if ((io->offset % LSV_PAGE_SIZE) == 0 && (io->size % LSV_PAGE_SIZE) == 0) {
                lsv_info->row2_stat.align_read++;

                off_new = io->offset;
                len_new = io->size;
        } else {
                range_align(LSV_PAGE_SIZE, io->offset, io->size, &off_new, &len_new);
        }

        _buffer = xmalloc(len_new);

        row2_span_rdlock(volume_proto, off_new, len_new);
        row2_bitmap_span_rdlock(volume_proto, off_new, len_new);

        ret = row2_volume_proto_read_aligned(volume_proto, off_new, len_new, _buffer);
        if (unlikely(ret)) {
                xfree(_buffer);
                GOTO(un_lock, ret);
        }

#if 0
        mbuffer_copy(buf, (const char *)_buffer + io->offset - off_new, io->size);
        xfree(_buffer);
#else
        mbuffer_attach(buf, (char *)_buffer + io->offset - off_new, io->size, _buffer);
#endif

        YASSERT(buf->len == io->size);

un_lock:
        row2_bitmap_span_unlock(volume_proto, off_new, len_new);
        row2_span_unlock(volume_proto, off_new, len_new);
        lsv_unlock(&lsv_info->io_lock);
        return ret;
}

/**
 * @brief 根据bitmap定位(当前写入点），然后调用volume写入
 *
 * @param volume_proto
 * @param io
 * @param buf
 * @return
 */

int row2_volume_proto_write(volume_proto_t *volume_proto, const io_t *io, const buffer_t *buf)
{
        int ret = 0;
        uint64_t off_new = io->offset;
        uint32_t len_new = io->size;
        uint8_t *_buffer;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        static uint64_t __write_1 = 0;
        static uint64_t __write_2 = 0;
        static uint64_t __write_3 = 0;

        // lsv_wrlock(&lsv_info->io_lock);
        lsv_rdlock(&lsv_info->io_lock);// lock for snapshots, resizing etc.

        // 保护对同一页bitmap unit（4k=2*2k）的并发访问，
        // 一个bitmap unit是2k，可以管理1M LBA空间, 4K共可以管理2M LBA空间
        row2_span_wrlock(volume_proto, off_new, len_new);

        if (row_bitmap_chunked_will_cow(lsv_info, off_new, len_new))
                row2_bitmap_span_wrlock(volume_proto, off_new, len_new);
        else
                row2_bitmap_span_rdlock(volume_proto, off_new, len_new);

        if ((io->offset % LSV_PAGE_SIZE) == 0 && (io->size % LSV_PAGE_SIZE) == 0) {
                lsv_info->row2_stat.align_write++;

                off_new = io->offset;
                len_new = io->size;

                if (mbuffer_segcount((buffer_t *)buf) > 1) {
                        __write_1++;

                        _buffer = xmalloc(len_new);

                        mbuffer_get(buf, _buffer, io->size);

                        ret = row2_volume_proto_write_aligned(volume_proto, off_new, len_new, _buffer);

                        xfree(_buffer);
                } else {
                        __write_2++;

                        struct list_head *pos, *n;
                        seg_t *seg;

                        list_for_each_safe(pos, n, &buf->list)
                        {
                                seg = (seg_t *)pos;

                                ret = row2_volume_proto_write_aligned(volume_proto, off_new, len_new, seg->handler.ptr);

                                break;
                        }
                }
                // lsv_unlock(&lsv_info->io_lock);
        } else {
                __write_3++;

                range_align(LSV_PAGE_SIZE, io->offset, io->size, &off_new, &len_new);

                _buffer = xmalloc(len_new);

                // for(uint64_t lba = off_new; lba < off_new + len_new; lba += 2 * CHUNK_SIZE)
                //       lba_lock_wrlock(lsv_info->lock_table, round_down(lba, 2 * CHUNK_SIZE));

                if (len_new > 16 * 1024) {
                        if (io->offset % LSV_PAGE_SIZE != 0) {
                                ret = row2_volume_proto_read_aligned(volume_proto, off_new, LSV_PAGE_SIZE, _buffer);
                                if (unlikely(ret)) {
                                        xfree(_buffer);
                                        goto un_lock;
                                }
                        }

                        if ((io->offset + io->size) % LSV_PAGE_SIZE != 0) {
                                ret = row2_volume_proto_read_aligned(volume_proto, off_new + len_new - LSV_PAGE_SIZE, LSV_PAGE_SIZE,
                                                                     _buffer + len_new - LSV_PAGE_SIZE);
                                if (unlikely(ret)) {
                                        xfree(_buffer);
                                        goto un_lock;
                                }
                        }
                } else
                        ret = row2_volume_proto_read_aligned(volume_proto, off_new, len_new, _buffer);

                if (unlikely(ret)) {
                        xfree(_buffer);

                        goto un_lock;
                }

                mbuffer_get(buf, _buffer + io->offset - off_new, io->size);

                ret = row2_volume_proto_write_aligned(volume_proto, off_new, len_new, _buffer);

                xfree(_buffer);
        }

un_lock:
        DBUG("count %ju %ju %ju\n", __write_1, __write_2, __write_3);

        // for(uint64_t lba = off_new; lba < off_new + len_new; lba += 2 * CHUNK_SIZE)
        //        lba_lock_unlock(lsv_info->lock_table, round_down(lba, 2 * CHUNK_SIZE));

        row2_span_unlock(volume_proto, off_new, len_new);
        row2_bitmap_span_unlock(volume_proto, off_new, len_new);

        lsv_unlock(&lsv_info->io_lock);

        return ret;
}

static inline int row1_is_bitmap_page_valid(row1_bitmap_unit_t *bitmap, uint32_t page_idx, uint32_t n_pages)
{
        for (int i = 0; i < n_pages; i++) {
                int l_index = page_idx / 8;
                int r_index = page_idx % 8;

                if (!(bitmap->page_bits[l_index] & (1 << r_index)))
                        return 0;

                page_idx++;
        }

        return 1;
}

static inline int row1_is_bitmap_page_valid_repeats(row1_bitmap_unit_t *bitmap, uint32_t page_idx, uint32_t *n_repeats)
{
        int l_index = page_idx / 8;
        int r_index = page_idx % 8;
        uint32_t _page_idx = page_idx;

        if (!(bitmap->page_bits[l_index] & (1 << r_index))) {
                int i;
                page_idx++;

                for (i = 1; i < sizeof(bitmap->page_bits) * 8 - _page_idx; i++) {
                        l_index = page_idx / 8;
                        r_index = page_idx % 8;

                        if (!(bitmap->page_bits[l_index] & (1 << r_index)))
                                page_idx++;
                        else
                                break;
                }

                *n_repeats = i;
                return 0;
        } else {
                int i;
                page_idx++;

                for (i = 1; i < sizeof(bitmap->page_bits) * 8 - _page_idx; i++) {
                        l_index = page_idx / 8;
                        r_index = page_idx % 8;

                        if (bitmap->page_bits[l_index] & (1 << r_index))
                                page_idx++;
                        else
                                break;
                }

                *n_repeats = i;
                return 1;
        }
}

void row1_bitmap_page_mark_valid(row1_bitmap_unit_t *bitmap, uint32_t page_idx, uint32_t n_pages)
{
        for (int i = 0; i < n_pages; i++) {
                int l_index = page_idx / 8;
                int r_index = page_idx % 8;

                bitmap->page_bits[l_index] |= 1 << r_index;

                page_idx++;
        }
}

void row1_bitmap_page_clear_valid(row1_bitmap_unit_t *bitmap, uint32_t page_idx, uint32_t n_pages)
{
        for (int i = 0; i < n_pages; i++) {
                int l_index = page_idx / 8;
                int r_index = page_idx % 8;

                bitmap->page_bits[l_index] &= ~(1 << r_index);

                page_idx++;
        }
}
